package com.atguigu.app.dim_dwd;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.app.func.DimCreateTableRichMapFunction_02;
import com.atguigu.app.func.DimDwdTableProcessFunction;
import com.atguigu.app.func.DimSinkFunction;
import com.atguigu.bean.TableProcess;
import com.atguigu.common.Constant;
import com.atguigu.utils.KafkaUtil;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.apache.kafka.clients.producer.ProducerRecord;

import javax.annotation.Nullable;

public class DimAndDwdApp {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //1.1 开启CK
//        env.enableCheckpointing(10000L);
//        CheckpointConfig checkpointConfig = env.getCheckpointConfig();
//        checkpointConfig.setCheckpointTimeout(20000L);
//        checkpointConfig.setCheckpointStorage("hdfs://hadoop102:8020/flink-ck");
//        checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//        //checkpointConfig.setCheckpointInterval(10000L);
//        checkpointConfig.setMinPauseBetweenCheckpoints(5000L);
//        checkpointConfig.setMaxConcurrentCheckpoints(2);
//        //默认是int类型的最大值
//        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000L));
//        env.setStateBackend(new HashMapStateBackend());
//
//        System.setProperty("HADOOP_USER_NAME", "atguigu");

        //2.读取Kafka ODS层 topic_db主题数据
        DataStreamSource<String> sourceDS = env.fromSource(KafkaUtil.getKafkaSource(Constant.TOPIC_ODS_DB, "dim_dwd_app_230315"), WatermarkStrategy.noWatermarks(), "kafka-source");

        //3.将数据转换为JSON对象同时过滤脏数据  主流
        SingleOutputStreamOperator<JSONObject> jsonObjDS = sourceDS.flatMap(new FlatMapFunction<String, JSONObject>() {
            @Override
            public void flatMap(String value, Collector<JSONObject> out) throws Exception {
                if (!"".equals(value)) {
                    try {
                        JSONObject jsonObject = JSON.parseObject(value);
                        out.collect(jsonObject);
                    } catch (JSONException e) {
                        System.out.println("脏数据：" + value);
                    }
                }
            }
        });

        jsonObjDS.print("jsonObjDS>>>>");

        //4.使用FlinkCDC读取MySQL中的配置信息表创建配置流,建表&广播
        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname(Constant.MYSQL_HOST)
                .port(Constant.MYSQL_PORT)
                .username("root")
                .password("000000")
                .databaseList("gmall-230315-config")
                .tableList("gmall-230315-config.table_process")
                .deserializer(new JsonDebeziumDeserializationSchema())
                .startupOptions(StartupOptions.initial())
                .build();
        DataStreamSource<String> mysqlDS = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "mysql-source");

        SingleOutputStreamOperator<TableProcess> tableProcessDS = mysqlDS.map(new DimCreateTableRichMapFunction_02());
        MapStateDescriptor<String, TableProcess> mapStateDescriptor = new MapStateDescriptor<>("map-state", String.class, TableProcess.class);
        BroadcastStream<TableProcess> broadcastDS = tableProcessDS.broadcast(mapStateDescriptor);

        //5.连接两个流
        BroadcastConnectedStream<JSONObject, TableProcess> connectDS = jsonObjDS.connect(broadcastDS);

        //6.处理连接流  根据广播流内容过滤主流数据,同时将HBase与Kafka数据分流
        OutputTag<JSONObject> kafkaTag = new OutputTag<JSONObject>("kafka-ds") {
        };
        SingleOutputStreamOperator<JSONObject> hbaseDS = connectDS.process(new DimDwdTableProcessFunction(mapStateDescriptor, kafkaTag));

        //7.获取侧流数据,将主流与侧流分别写入HBase与Kafka
        SideOutputDataStream<JSONObject> kafkaDS = hbaseDS.getSideOutput(kafkaTag);
        hbaseDS.print("HBase>>>>>");
        kafkaDS.print("Kafka>>>>>");
        hbaseDS.addSink(new DimSinkFunction());
        kafkaDS.sinkTo(KafkaUtil.getKafkaSink(new KafkaRecordSerializationSchema<JSONObject>() {
            @Nullable
            @Override
            public ProducerRecord<byte[], byte[]> serialize(JSONObject element, KafkaSinkContext context, Long timestamp) {
                return new ProducerRecord<>(element.getString("topic"),
                        element.getString("data").getBytes());
            }
        }));

        //8.启动
        env.execute("DimAndDwdApp");

    }
}
